#!/usr/bin/python3

# This file is part of Cockpit.
#
# Copyright (C) 2015 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <http://www.gnu.org/licenses/>.

import parent
from storagelib import *
from testlib import *


storaged_debug_service = """
[Unit]
Description=Storaged
Documentation=man:storaged(8)

[Service]
Type=dbus
BusName=org.storaged.Storaged
ExecStart=/usr/lib/storaged/storaged
"""


class TestStorageMounting(StorageCase):

    def testMounting(self):
        m = self.machine
        b = self.browser

        mount_point_foo = "/run/foo"
        mount_point_bar = "/run/bar"

        self.login_and_go("/storage")

        # Add a disk
        m.add_disk("50M", serial="MYDISK")
        b.wait_in_text("#drives", "MYDISK")
        b.click('#drives .sidepanel-row:contains("MYDISK")')
        b.wait_visible('#storage-detail')

        # Format it

        self.content_row_action(1, "Format")
        self.dialog_wait_open()
        self.dialog_set_val("type", "ext4")
        self.dialog_set_val("name", "FILESYSTEM")
        self.dialog_set_val("mount_point", "")
        self.dialog_apply()
        self.dialog_wait_error("mount_point", "Mount point cannot be empty")
        self.dialog_set_val("mount_point", mount_point_foo)
        self.dialog_apply()
        self.dialog_wait_close()
        self.content_row_wait_in_col(1, 2, "ext4 filesystem")
        self.content_tab_wait_in_info(1, 1, "Name", "FILESYSTEM")
        self.wait_in_configuration("/dev/sda", "fstab", "dir", mount_point_foo)

        self.content_tab_wait_in_info(1, 1, "Mount point", mount_point_foo)

        self.content_dropdown_action(1, "Unmount")
        self.content_tab_wait_in_info(1, 1, "Mount point", "The filesystem is not mounted")

        self.content_tab_info_action(1, 1, "Name")
        self.dialog({"name": "filesystem"})
        self.content_tab_wait_in_info(1, 1, "Name", "filesystem")

        self.dialog_with_retry(trigger=lambda: self.content_tab_info_action(1, 1, "Mount point", wrapped=True),
                               expect={"mount_point": mount_point_foo},
                               values={"mount_point": mount_point_bar})
        self.wait_in_configuration("/dev/sda", "fstab", "dir", mount_point_bar)
        self.content_tab_wait_in_info(1, 1, "Mount point", mount_point_bar)

        self.content_row_action(1, "Mount")
        self.confirm()
        self.wait_mounted(1, 1)

        # Set the "Never unlock at boot option"
        self.content_tab_info_action(1, 1, "Mount point")
        self.dialog({"mount_options.never_auto": True})
        self.assertIn("noauto", m.execute(f"findmnt -s -n -o OPTIONS {mount_point_bar}"))
        self.assertIn("x-cockpit-never-auto", m.execute(f"findmnt -s -n -o OPTIONS {mount_point_bar}"))

        # Go to overview page and check that the filesystem usage is
        # displayed correctly.

        def wait_ratio_in_range(sel, low, high):
            b.wait_js_func("""(function (sel, low, high) {
              var text = ph_text(sel);
              var match = text.match('([0-9.]+) / ([0-9]+)');
              if (!match)
                return false;
              var ratio = parseFloat(match[1]) / parseFloat(match[2]);
              return low <= ratio && ratio <= high;
            })""", sel, low, high)

        b.go("#/")
        b.wait_in_text("table[aria-label=Filesystems]", mount_point_bar)
        bar_selector = f'table[aria-label=Filesystems] tr:contains("{mount_point_bar}") td:nth-child(4)'
        wait_ratio_in_range(bar_selector, 0.0, 0.1)
        m.execute(f"dd if=/dev/zero of={mount_point_bar}/zero bs=1M count=30 status=none")
        wait_ratio_in_range(bar_selector, 0.5, 1.0)
        m.execute(f"rm {mount_point_bar}/zero")
        wait_ratio_in_range(bar_selector, 0.0, 0.1)
        b.click('table[aria-label=Filesystems] tr:contains("%s")' % mount_point_bar)
        b.wait_visible("#storage-detail")

        def wait_info_field_value(name, value):
            return b.wait_text(f'#detail-header dt:contains("{name}") + dd', value)

        wait_info_field_value("Serial number", "MYDISK")

    def testMountingHelp(self):
        m = self.machine
        b = self.browser

        self.login_and_go("/storage")

        # Add a disk
        m.add_disk("50M", serial="MYDISK")
        b.wait_in_text("#drives", "MYDISK")
        b.click('#drives .sidepanel-row:contains("MYDISK")')
        b.wait_visible('#storage-detail')

        # Format it

        self.content_row_action(1, "Format")
        self.dialog_wait_open()
        self.dialog_set_val("type", "ext4")
        self.dialog_set_val("name", "FILESYSTEM")
        self.dialog_set_val("mount_point", "/run/foo")
        self.dialog_apply()
        self.dialog_wait_close()
        self.content_row_wait_in_col(1, 2, "ext4 filesystem")
        self.content_tab_wait_in_info(1, 1, "Name", "FILESYSTEM")
        self.content_tab_wait_in_info(1, 1, "Mount point", "/run/foo")

        fsys_tab = self.content_tab_expand(1, 1)

        # Unmount externally, remount with Cockpit

        m.execute("umount /run/foo")
        b.click(fsys_tab + " button:contains(Mount now)")
        b.wait_not_present(fsys_tab + " button:contains(Mount now)")
        self.wait_mounted(1, 1)

        # Unmount externally, adjust fstab with Cockpit

        m.execute("umount /run/foo")
        b.click(fsys_tab + " button:contains(Do not mount automatically on boot)")
        b.wait_not_present(fsys_tab + " button:contains(Do not mount automatically on boot)")

        # Mount externally, unmount with Cockpit

        m.execute("mount /run/foo")
        b.click(fsys_tab + " button:contains(Unmount now)")
        b.wait_not_present(fsys_tab + " button:contains(Unmount now)")

        # Mount externally, adjust fstab with Cockpit

        m.execute("mount /run/foo")
        b.click(fsys_tab + " button:contains(Mount also automatically on boot)")
        b.wait_not_present(fsys_tab + " button:contains(Mount also automatically on boot)")

        # Move mount point externally, move back with Cockpit

        m.execute("umount /run/foo")
        m.execute("mkdir -p /run/bar && mount /dev/sda /run/bar")
        b.click(fsys_tab + " button:contains(Mount on /run/foo now)")
        b.wait_not_present(fsys_tab + " button:contains(Mount on /run/foo now)")

        # Move mount point externally, adjust fstab with Cockpit

        m.execute("umount /run/foo")
        m.execute("mkdir -p /run/bar && mount /dev/sda /run/bar")
        b.click(fsys_tab + " button:contains(Mount automatically on /run/bar on boot)")
        b.wait_not_present(fsys_tab + " button:contains(Mount automatically on /run/bar on boot)")

        # Using noauto,x-systemd.automount should not show a warning
        m.execute("sed -i -e 's/auto defaults/auto noauto/' /etc/fstab")
        b.wait_visible(fsys_tab + " button:contains(Mount also automatically on boot)")
        m.execute("sed -i -e 's/noauto/noauto,x-systemd.automount/' /etc/fstab")
        b.wait_not_present(fsys_tab + " button:contains(Mount also automatically on boot)")

    def testEncryptedMountingHelp(self):
        m = self.machine
        b = self.browser

        self.login_and_go("/storage")

        # Add a disk
        m.add_disk("50M", serial="MYDISK")
        dev = "/dev/disk/by-id/scsi-0QEMU_QEMU_HARDDISK_MYDISK"

        b.wait_in_text("#drives", "MYDISK")
        b.click('#drives .sidepanel-row:contains("MYDISK")')
        b.wait_visible('#storage-detail')

        # Format it with encryption

        self.content_row_action(1, "Format")
        self.dialog_wait_open()
        self.dialog_set_val("type", "ext4")
        self.dialog_set_val("name", "FILESYSTEM")
        self.dialog_set_val("crypto", self.default_crypto_type)
        self.dialog_set_val("crypto_options", "xxx")
        self.dialog_set_val("passphrase", "vainu-reku-toma-rolle-kaja")
        self.dialog_set_val("passphrase2", "vainu-reku-toma-rolle-kaja")
        self.dialog_set_val("mount_point", "/run/foo")
        self.dialog_apply()
        self.dialog_wait_close()
        self.content_row_wait_in_col(1, 2, "ext4 filesystem (encrypted)")
        self.content_tab_wait_in_info(1, 1, "Name", "FILESYSTEM")
        self.content_tab_wait_in_info(1, 1, "Mount point", "/run/foo")

        # Unmount and lock externally, unlock and remount with Cockpit

        m.execute("umount /run/foo")
        m.execute(f"udisksctl lock -b {dev}")
        # wait until the UI updated to the locking
        self.content_tab_wait_in_info(1, 2, "Cleartext device", "-")
        fsys_tab = self.content_tab_expand(1, 1)
        b.click(fsys_tab + " button:contains(Mount now)")
        self.dialog({"passphrase": "vainu-reku-toma-rolle-kaja"})
        b.wait_not_present(fsys_tab + " button:contains(Mount now)")
        self.wait_mounted(1, 1)

        # Unmount and lock externally, adjust fstab with Cockpit

        m.execute("umount /run/foo")
        m.execute(f"udisksctl lock -b {dev}")
        # wait until the UI updated to the locking
        self.content_tab_wait_in_info(1, 2, "Cleartext device", "-")
        fsys_tab = self.content_tab_expand(1, 1)
        b.click(fsys_tab + " button:contains(Do not mount automatically on boot)")
        b.wait_not_present(fsys_tab + " button:contains(Do not mount automatically on boot)")

        # Unlock and mount externally, unmount and lock with Cockpit

        m.execute(f"echo -n vainu-reku-toma-rolle-kaja | udisksctl unlock --key-file /dev/stdin -b {dev}")
        m.execute("mount /run/foo")
        b.click(fsys_tab + " button:contains(Unmount now)")
        b.wait_not_present(fsys_tab + " button:contains(Unmount now)")

        # Unlock and mount externally, adjust fstab with Cockpit

        m.execute(f"echo -n vainu-reku-toma-rolle-kaja | udisksctl unlock --key-file /dev/stdin -b {dev}")
        m.execute("mount /run/foo")
        b.click(fsys_tab + " button:contains(Mount also automatically on boot)")
        b.wait_not_present(fsys_tab + " button:contains(Mount also automatically on boot)")

        # Add noauto to crypttab (but not fstab), remove with Cockpit

        m.execute("sed -i -e 's/xxx/xxx,noauto/' /etc/crypttab")
        b.click(fsys_tab + " button:contains(Unlock automatically on boot)")
        b.wait_not_present(fsys_tab + " button:contains(Unlock automatically on boot)")

        # Add noauto to crypttab (but not fstab), add also to fstab with Cockpit

        m.execute("sed -i -e 's/xxx/xxx,noauto/' /etc/crypttab")
        b.click(fsys_tab + " button:contains(Do not mount automatically on boot)")
        b.wait_not_present(fsys_tab + " button:contains(Do not mount automatically on boot)")

    def testDuplicateMountPoints(self):
        m = self.machine
        b = self.browser

        self.login_and_go("/storage")

        # Quickly make two logical volumes
        m.add_disk("50M", serial="MYDISK")
        b.wait_in_text("#drives", "MYDISK")
        m.execute("vgcreate test /dev/sda && lvcreate test -n one -L 20M && lvcreate test -n two -L 20M")
        b.click('#devices .sidepanel-row:contains("test")')
        b.wait_visible("#storage-detail")
        self.content_row_wait_in_col(1, 1, "one")
        self.content_row_wait_in_col(2, 1, "two")

        # Encrypt and format the first and give it /run/data as the mount point
        self.content_row_action(1, "Format")
        self.dialog({"type": "ext4",
                     "crypto": self.default_crypto_type,
                     "passphrase": "vainu-reku-toma-rolle-kaja",
                     "passphrase2": "vainu-reku-toma-rolle-kaja",
                     "mount_point": "/run/data"})
        self.content_row_wait_in_col(1, 2, "ext4 filesystem")
        self.content_tab_wait_in_info(1, 2, "Mount point", "/run/data")

        # Format the second and also try to use /run/data as the mount point
        self.content_row_action(2, "Format")
        self.dialog_wait_open()
        self.dialog_set_val("type", "ext4")
        self.dialog_set_val("mount_point", "/run/data")
        self.dialog_apply()
        self.dialog_wait_error("mount_point", "Mount point is already used for /dev/test/one")
        self.dialog_cancel()
        self.dialog_wait_close()

        # Format the first and re-use /run/data as the mount point.
        # This should be allowed.
        self.content_dropdown_action(1, "Format")
        self.dialog({"type": "ext4",
                     "mount_point": "/run/data"})
        self.content_row_wait_in_col(1, 2, "ext4 filesystem")
        self.content_tab_wait_in_info(1, 2, "Mount point", "/run/data")

    def testFirstMount(self):
        m = self.machine
        b = self.browser

        self.login_and_go("/storage")

        m.add_disk("50M", serial="MYDISK")
        b.click("#drives .sidepanel-row:contains(MYDISK)")
        b.wait_visible("#storage-detail")

        m.execute("mkfs.ext4 /dev/sda")
        self.content_row_wait_in_col(1, 2, "ext4 filesystem")

        m.execute("! grep /run/data /etc/fstab")
        self.content_row_action(1, "Mount")
        self.dialog({"mount_point": "/run/data",
                     "mount_options.extra": "x-foo"})
        m.execute("grep /run/data /etc/fstab")
        m.execute("grep 'x-foo' /etc/fstab")

    def testNeverAuto(self):
        m = self.machine
        b = self.browser

        self.login_and_go("/storage")

        # Add a disk and format it with luks and a filesystem, but with "Never unlock at boot"

        m.add_disk("50M", serial="MYDISK")
        b.wait_in_text("#drives", "MYDISK")
        b.click('.sidepanel-row:contains("MYDISK")')
        b.wait_visible("#storage-detail")

        self.content_row_action(1, "Format")
        self.dialog({"type": "ext4",
                     "crypto": self.default_crypto_type,
                     "passphrase": "vainu-reku-toma-rolle-kaja",
                     "passphrase2": "vainu-reku-toma-rolle-kaja",
                     "mount_options.auto": True,
                     "mount_options.never_auto": True,
                     "mount_point": "/run/foo"})
        self.content_row_wait_in_col(1, 2, "ext4 filesystem (encrypted)")
        self.content_tab_wait_in_info(1, 1, "Mount point", "never mounted at boot")

        # The filesystem should be mounted but have the "noauto"
        # option in both fstab and crypttab
        self.wait_mounted(1, 1)
        self.assertIn("noauto", m.execute("grep /run/foo /etc/fstab || true"))
        self.assertNotEqual(m.execute("grep noauto /etc/crypttab"), "")

        # Unmounting should keep the noauto option, as always
        self.content_dropdown_action(1, "Unmount")
        self.content_tab_wait_in_info(1, 1, "Mount point", "The filesystem is not mounted")
        self.assertIn("noauto", m.execute("grep /run/foo /etc/fstab || true"))
        self.assertNotEqual(m.execute("grep noauto /etc/crypttab"), "")

        # Mounting should also keep the "noauto", but it should not show up in the extra options
        self.content_row_action(1, "Mount")
        self.dialog_check({"mount_options.extra": False})
        self.dialog_set_val("passphrase", "vainu-reku-toma-rolle-kaja")
        self.dialog_apply()
        self.dialog_wait_close()
        self.wait_mounted(1, 1)
        self.assertIn("noauto", m.execute("grep /run/foo /etc/fstab || true"))
        self.assertNotEqual(m.execute("grep noauto /etc/crypttab"), "")

        # As should updating the mount information
        self.content_tab_info_action(1, 1, "Mount point", wrapped=True)
        self.dialog_check({"mount_options.extra": False})
        self.dialog_apply()
        self.dialog_wait_close()
        self.assertIn("noauto", m.execute("grep /run/foo /etc/fstab || true"))
        self.assertNotEqual(m.execute("grep noauto /etc/crypttab"), "")

        # Removing "noauto" from fstab but not from crypttab externally should show a warning
        m.execute("sed -i -e 's/noauto//' /etc/fstab")
        fsys_tab = self.content_tab_expand(1, 1)
        b.wait_in_text(fsys_tab, "The filesystem is configured to be automatically mounted on boot but its encryption container will not be unlocked at that time.")
        b.click(fsys_tab + " button:contains(Do not mount automatically)")
        b.wait_not_present(fsys_tab + " button:contains(Do not mount automatically)")


if __name__ == '__main__':
    test_main()
